# -*- coding: utf-8 -*-
from datetime import timedelta
from utils.operators.spark_submit import SparkSubmitOperator

t_sub_2_dt = '{{ execution_date | date_add(-60) | cst_ds }}'

ex_tbl_list = '["jms_dwd.dwd_merger_small_files_info_dt","jms_dwd.dwd_wide_summary_waybill_incre",' \
              '"jms_dwd.dwd_wide_trace_waybill_step_incre","jms_dwd.dwd_wide_barscan_operations_hist_dt",' \
              '"jms_dwd.dwd_wide_sign_summary_waybill_dt","jms_dwd.dwd_wide_sign_trace_waybill_step_dt",' \
              '"jms_dwd.dwd_wide_unsign_summary_waybill_dt","jms_dwd.dwd_wide_unsign_trace_wabill_step_dt",' \
              '"jms_dwd.dwd_appeal_info_dt","jms_dwd.dwd_yl_oms_oms_waybill_incre_hi",' \
              '"jms_dwd.dwd_yl_oms_oms_waybill_incre_dt","jms_dwd.dwd_second_package_list_base_dt",' \
              '"jms_dwd.dwd_offline_complaint_dt","jms_dwd.dwd_project_customer_waybill_work_order_mid_dt",' \
              '"jms_dwd.dwd_yl_oms_interceptorpiece_base_hi","jms_dwd.dwd_third_package_list_base_hi",' \
              '"jms_dwd.dwd_tab_reback_transfer_express_base_hi","jms_dwd.dwd_adjudication_base_dt",' \
              '"jms_dwd.dwd_project_work_order_new_base_dt","jms_dwd.dwd_tab_electronic_package_list_base_dt",' \
              '"jms_dwd.dwd_sqs_problem_express_wo_log_base_dt","jms_dwd.dwd_claim_work_order_base_hi",' \
              '"jms_dwd.dwd_yl_oms_oms_order_incre_dt","jms_dwd.dwd_yl_oms_pick_bill_base_dt",' \
              '"jms_dwd.dwd_tmsnew_shipment_union_base_dt","jms_dwd.dwd_yl_oms_interceptorpiece_base_dt",' \
              '"jms_dwd.dwd_third_package_list_base_dt","jms_dwd.dwd_tab_end_piece_base_hi",' \
              '"jms_dwd.dwd_tab_barscan_transfer_mail_base_dt","jms_dwd.dwd_arbitration_dt",' \
              '"jms_dwd.dwd_wide_abnormal_detail_waybill_dt","jms_dwd.dwd_sqs_problem_express_wo_base_dt",' \
              '"jms_dwd.dwd_tab_reback_transfer_express_base_dt","jms_dwd.dwd_project_work_order_dt",' \
              '"jms_dwd.dwd_tmsnew_shipment_stop_union_base_dt","jms_dwd.dwd_tab_end_piece_base_dt","jms_dwd.dwd_work_order_dt",' \
              '"jms_dwd.dwd_complaint_info_dt","jms_dwd.dwd_high_risk_user_base_dt","jms_dwd.dwd_project_work_order_base_dt",' \
              '"jms_dwd.dwd_problem_express_wo_dt","jms_dwd.dwd_tmsnew_shipment_stop_union_base_hi","jms_dwd.dwd_arbitration_base_dt",' \
              '"jms_dwd.dwd_tmsnew_shipment_union_base_hi","jms_dwd.dwd_tms_temp_transport_base_dt","jms_dwd.dwd_claim_work_order_new_base_hi",' \
              '"jms_dwd__dwd_terminal_sign_all_detail_new_dt","jms_dwd__dwd_terminal_send_dt"]'

json = """{
    "mode":"database",
    "db_or_tbWithDB":"jms_dwd",
    "numThreads":30,
    "env":"pro",
    "excludeTbList":ex_tbl_list,
    "realTimeHdfsLocations":[],
    "saveMergerInfo":1,
    "continueDays":2,
    "topN":20,
    "mergerInfoInsertTbName":"jms_dwd.dwd_merger_small_files_info_dt",
    "partitionStart":"start_date",
    "partitionEnd":"end_date"
}""".replace("start_date",t_sub_2_dt)\
    .replace("end_date",t_sub_2_dt)\
    .replace("ex_tbl_list",ex_tbl_list)

jms_dwd_merger = SparkSubmitOperator(
    task_id='jms_dwd_merger',
    email=['lukunming@jtexpress.com','yl_bigdata@yl-scm.com'],
    name='jms_dwd_merger_{{ execution_date | date_add(1) | cst_ds }}',
    pool_slots=4,
    retries=1,
    execution_timeout=timedelta(hours=2),
    driver_cores=2,
    driver_memory='8G',
    executor_cores=4,
    executor_memory='6G',
    num_executors=50,
    conf={'spark.dynamicAllocation.enabled': 'true',
          'spark.shuffle.service.enabled': 'true',
          'spark.dynamicAllocation.maxExecutors': 60,
          'spark.dynamicAllocation.cachedExecutorIdleTimeout': 60,
          'spark.sql.sources.partitionOverwriteMode': 'dynamic',
          'spark.executor.memoryOverhead': '2G',
          'spark.sql.shuffle.partitions': 600,
          'spark.yarn.maxAppAttempts': 1,
          },
    jars='hdfs:///scheduler/jms/spark/wangbiao/small_files_merger/common-1.0-SNAPSHOT.jar',  # 依赖 jar 包
    java_class='com.yunlu.bigdata.jobs.export.mergerSmallFile.SmallFileMergerTest4',  # spark 主类
    application='hdfs:///scheduler/jms/spark/wangbiao/small_files_merger/original-jobs-1.0-SNAPSHOT.jar',  # spark jar 包
    yarn_queue='pro',
    application_args=[json,],
)
